package cn.jdemo.core.flink;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Iterator;

public class BatchExe {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        ArrayList<String> data = new ArrayList<>();
        data.add("hadoop,hive");
        data.add("hadoop,spark");
        data.add("hadoop,flink");

        DataSource<String> dataSource = env.fromCollection(data);

        MapPartitionOperator<String, String> outputDataSource = dataSource.mapPartition(new MapPartitionFunction<String, String>() {
            @Override
            public void mapPartition(Iterable<String> lines, Collector<String> collector) throws Exception {
                Iterator<String> iterator = lines.iterator();
                while (iterator.hasNext()) {
                    String line = iterator.next();
                    String[] words = line.split(",");
                    for (String word : words) {
                        collector.collect(word);
                    }
                }
            }
        });

        String path = BatchExe.class.getClassLoader().getResource("words.txt").getPath();
        System.out.println(path);
        outputDataSource.distinct(0).writeAsText(path.substring(0,path.lastIndexOf("/")) + "/word.txt");
        env.execute(BatchExe.class.getName());
    }
}
